Skip to content

Expose streaming API #1013

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from

Conversation

s0l0ist
Copy link
Contributor

@s0l0ist s0l0ist commented Jul 11, 2025

📬 Issue #, if available:

The SDK supports streaming, but lambda_http::Adapter only handles buffered responses from Axum's Router (i.e., a Service). The lambda_http::run_with_streaming_response is available, but it doesn't allow you to specify a custom runtime which is necessary if you want to support OpenTelemetry.

Related (ish):

✍️ Description of changes:

This PR exposes:

  • StreamAdapter: Converts a tower Service into an AWS Lambda streaming response.

I also have added two new examples which showcase how to use Axum with run_with_streaming_response and with StreamAdapter + OTeL

This is how you can use it with a custom runtime supporting OTeL:

use crate::{error::BoxError, service::build_service};
use axum::Router;
use lambda_runtime::{
    Runtime,
    layers::{OpenTelemetryFaasTrigger, OpenTelemetryLayer},
};
use opentelemetry_sdk::trace as sdktrace;

/// The main function to run the AWS Lambda
pub async fn async_main(tracer_provider: sdktrace::SdkTracerProvider) -> Result<(), BoxError> {
    let (app, _): (Router, _) = build_service().await;

    // For a buffered response:
    // let handler = lambda_http::Adapter::from(app);
    // For a streamed response:
    let handler = lambda_http::StreamAdapter::from(app);

    let runtime = Runtime::new(handler).layer(
        // Create a tracing span for each Lambda invocation
        OpenTelemetryLayer::new(|| {
            if let Err(err) = tracer_provider.force_flush() {
                eprintln!("Error flushing traces: {:#?}", err);
            }
        })
        .with_trigger(OpenTelemetryFaasTrigger::Http),
    );
    runtime.run().await
}

🔏 By submitting this pull request

  • I confirm that I've ran cargo +nightly fmt.
  • I confirm that I've ran cargo clippy --fix.
  • I confirm that I've made a best effort attempt to update all relevant documentation.
  • I confirm that my contribution is made under the terms of the Apache 2.0 license.

@jlizen
Copy link
Contributor

jlizen commented Jul 11, 2025

Hi @s0l0ist , thanks for cutting this PR. This change seems very reasonable to me.

I do have concerns about leaking internals. This is only naming return types that are already public, but we might want to tweak their composition or otherwise shift bounds in ways that break callers. I think it would be best to type erase the returned service stack.

The easiest way to do that would be via a tower::util::BoxService / BoxLayer (or the sync/clone-bounded variants, though I don't think we need them here - but we could certainly add alternate APIs that do include the sync/clone bounds if it would be useful for how the returned struct is used by a runtime).

That adds a small amount of performance overhead due to an extra allocation and layer of dynamic dispatch. But, I think the ergonomics would be much better compared to a more complex, composable builder-style API using generics and sealed inner layer types. Note that we would probably want some sort of into_streaming_streaming_response_inner() that is what into_streaming_response() is currently, so that our run_with_streaming_response can skip the minor type erasure overhead.

How does that sound to you?

It would also be great to get a small example showing usage of this with a non-tokio runtime, if you'd be open to it! That would both let us validate the API, and make it more discoverable for users. I'd probably be ok with this landing without that, though, if you don't have cycles.

@s0l0ist
Copy link
Contributor Author

s0l0ist commented Jul 19, 2025

@jlizen, I think that's the right direction. Currently busy right now, but can revisit this PR in a couple of weeks from now.

@s0l0ist
Copy link
Contributor Author

s0l0ist commented Aug 10, 2025

@jlizen - does this API work? I'll add an example or two if I have time, but wanted to make sure this was aligned beforehand.

Copy link
Contributor

@jlizen jlizen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General approach looks good, but left a note around newtyping the BoxService so we don't lock ourselves into extra Service: Send + 'static bounds for all time. (For now specifying them in our API ourselves is fine, just want to leave the door open to removing them without type breakage).

I'm seeing that we don't have ANY tests currently of the run_with_streaming_response() API... that is unfortunate... An example would be really great if you have the time (which would double as a test that things compile at least). And then you could trivially validate e2e with cargo-lambda at that point. (Or feel free to throw in an integration test if you want our CI/CD to probe the e2e, up to you).

@s0l0ist
Copy link
Contributor Author

s0l0ist commented Aug 16, 2025

Hi @jlizen, I think I may have a better approach.

I noticed that Adapter, handles buffered responses by wrapping a Service<Request> and producing a LambdaResponse from anything that implements IntoResponse. Taking inspiration from that, I built a StreamAdapter that wraps a Service<Request, Response = Response<B>> and produces a StreamResponse<BodyStream<B>>.

This lets us remove the BoxService entirely while keeping the same ergonomics. The trade-off is a heap-allocated future per request (same as Adapter), but we avoid exposing BoxService in the public API.

With this change, run_with_streaming_response just mirrors the non-streaming run:

pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error>
where
    S: Service<Request, Response = Response<B>, Error = E>,
    S::Future: Send + 'a,
    B: Body + Unpin + Send + 'static,
    B::Data: Into<Bytes> + Send,
    B::Error: Into<Error> + Send + Debug,
{
    lambda_runtime::run(StreamAdapter::from(handler)).await
}

And I can selectively choose which one I want like this:

/// The main function to run the AWS Lambda
pub async fn async_main(tracer_provider: sdktrace::SdkTracerProvider) -> Result<(), BoxError> {
    let (app, _) = build_service().await;

    // For a buffered response:
    // let handler = lambda_http::Adapter::from(app);
    // For a streamed response:
    let handler = lambda_http::StreamAdapter::from(app);

    let runtime = Runtime::new(handler).layer(
        // Create a tracing span for each Lambda invocation
        OpenTelemetryLayer::new(|| {
            if let Err(err) = tracer_provider.force_flush() {
                eprintln!("Error flushing traces: {err:#?}");
            }
        })
        .with_trigger(OpenTelemetryFaasTrigger::Http),
    );
    runtime.run().await
}

@s0l0ist s0l0ist requested a review from jlizen August 16, 2025 12:01
Copy link
Contributor

@jlizen jlizen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the direction you went with this for the new API.

But, I would prefer to have somewhat duplicated code if that is what is needed to keep from adding a new allocation to the existing API. Especially given that there is no additional benefit to the caller. IE, let's leave the run_with_streaming_response API not using the StreamAdapter.

Presumably a decent chunk of it could be pulled out into a pure function to avoid too much duplication.

@s0l0ist
Copy link
Contributor Author

s0l0ist commented Aug 17, 2025

Okay I think I have it:

  • Add into_stream_service(...): zero-alloc builder (no boxed future/vtable) that is used by run_with_streaming_response so that it continues to have the same exact behavior.
  • Keep StreamAdapter for ergonomic stacking; it still returns a boxed future for API stability.
  • Factor common logic into into_stream_response(...) which is used by both StreamAdapter and into_stream_service

I added a parallel test to what Adapter already had - ensuring we can compile-check a BoxService built the same way. Once we’re happy with the shape, I’ll follow up more examples. I've already added the http-axum-streaming example.

@s0l0ist s0l0ist requested a review from jlizen August 17, 2025 11:26
Copy link
Contributor

@jlizen jlizen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I think you nailed the API! Thanks for all the iterations.

Looks like clippy/rustfmt runs are needed, lint CI is failing.

I'd be happy to merge it after those go green and cut a release, or I can wait if you're planning on adding more examples to this PR. Let me know your preference!

/// the user may require additional middleware between `lambda_runtime::run`
/// and where the `LambdaEvent` is converted into a `Request`.
#[test]
fn stream_adapter_is_boxable() {
Copy link
Contributor

@jlizen jlizen Aug 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Since this isn't making any assertions anyway, and is mostly testing that it compiles, would this perhaps be better suited as an example? Seems like a nice simple example of setting up custom middleware? Also would help capture any weirdness with private vs public API.

Up to you, I'm happy to have it here too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah can do! I think I was mostly just trying to mimic the same test for the Adapter. I can move this out into the example which would effectively test this.

@s0l0ist
Copy link
Contributor Author

s0l0ist commented Aug 17, 2025

Yes I see the CI checks failing - they're unrelated to this PR, but happy to clean them up.

@jlizen
Copy link
Contributor

jlizen commented Aug 17, 2025

Ah, ok, I didn't actually look at them - probably we got hit by some new clippy/rustfmt lints across toolchain versions. If you have time to clean them up, feel free / much appreciated (separate commit please).

It sounded like you did plan to hoist that test into an example, so I can sit tight on merging since it sounds like a new commit is coming. But just ping me if you want to ship it without another round of changes.

@s0l0ist
Copy link
Contributor Author

s0l0ist commented Aug 18, 2025

@jlizen , fixed CI here.

And yes, I plan on moving the test into the new example and that should wrap this PR up.

@s0l0ist
Copy link
Contributor Author

s0l0ist commented Aug 18, 2025

@jlizen - Okay! I think we're ready :)

Added two examples, one that showcases run_with_streaming_response and another that demonstrates configuring a custom runtime with OTeL using StreamAdapter. Both convert the service into a type-erased BoxService (effectively moving that assertion-less test into the examples), but this is before converting to a streaming service. I didn't have a good example to show converting the streaming service into a BoxService there. I can add back the unit test if you want.

I've tested manually e2e to verify everything works.

@s0l0ist s0l0ist requested a review from jlizen August 18, 2025 17:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants